syntax = "proto3";
import "google/protobuf/wrappers.proto";
service Chat {
rpc chat (stream ChatMessage)
returns (stream google.protobuf.StringValue);
}
message ChatMessage {
string name = 1;
string message = 2;
}
grpc_bidirectional_streaming.proto
and add service definition.$ ballerina grpc --input grpc_bidirectional_streaming.proto --output stubs
grpc_bidirectional_streaming_pb.bal
file is generated inside stubs directory.Proto To Ballerina
to get information on how to use Ballerina Protocol Buffers tool.import ballerina/grpc;
import ballerina/log;
mapgrpc:Caller consMap = {};
@grpc:ServiceConfig {
name: "chat",
clientStreaming: true,
serverStreaming: true
}
service Chat on new grpc:Listener(9090) {
resource function onOpen(grpc:Caller caller) {
log:printInfo(string `${caller.getId()} connected to chat`);
consMap[caller.getId().toString()] = caller;
}
resource function onMessage(grpc:Caller caller, ChatMessage chatMsg) {
grpc:Caller ep;
string msg = string `${chatMsg.name}: ${chatMsg.message}`;
log:printInfo("Server received message: " + msg);
foreach var [callerId, connection] in consMap.entries() {
ep = connection;
grpc:Error? err = ep->send(msg);
if (err is grpc:Error) {
log:printError("Error from Connector: " + err.reason() + " - "
+ <string> err.detail()["message"]);
} else {
log:printInfo("Server message to caller " + callerId
+ " sent successfully.");
}
}
}
resource function onError(grpc:Caller caller, error err) {
log:printError("Error from Connector: " + err.reason() + " - "
+ <string> err.detail()["message"]);
}
resource function onComplete(grpc:Caller caller) {
grpc:Caller ep;
string msg = string `${caller.getId()} left the chat`;
log:printInfo(msg);
var v = consMap.remove(caller.getId().toString());
foreach var [callerId, connection] in consMap.entries() {
ep = connection;
grpc:Error? err = ep->send(msg);
if (err is grpc:Error) {
log:printError("Error from Connector: " + err.reason() + " - "
+ <string> err.detail()["message"]);
} else {
log:printInfo("Server message to caller " + callerId
+ " sent successfully.");
}
}
}
}
grpc_bidirectional_streaming_pb.bal
to the module.service
, copy the stub file to the service
module.grpc_bidirectional_streaming.bal
inside the service
module and add service implementation.$ ballerina build service
$ ballerina run target/bin/service.jar
import ballerina/grpc;
import ballerina/io;
import ballerina/runtime;
int total = 0;
public function main() {
ChatClient chatEp = new("http://localhost:9090");
grpc:StreamingClient ep;
var res = chatEp->chat(ChatMessageListener);
if (res is grpc:Error) {
io:println("Error from Connector: " + res.reason() + " - "
+ <string> res.detail()["message"]);
return;
} else {
io:println("Initialized connection sucessfully.");
ep = res;
}
ChatMessage mes = { name: "Sam", message: "Hi " };
grpc:Error? connErr = ep->send(mes);
if (connErr is grpc:Error) {
io:println("Error from Connector: " + connErr.reason() + " - "
+ <string> connErr.detail()["message"]);
}
runtime:sleep(6000);
grpc:Error? result = ep->complete();
if (result is grpc:Error) {
io:println("Error in sending complete message", result);
}
}
service ChatMessageListener = service {
resource function onMessage(string message) {
io:println("Response received from server: " + message);
}
resource function onError(error err) {
io:println("Error reported from server: " + err.reason() + " - "
+ <string> err.detail()["message"]);
}
resource function onComplete() {
io:println("Server Complete Sending Responses.");
}
};
grpc_bidirectional_streaming_pb.bal
to the module.client
, copy the stub file to the client
module.grpc_bidirectional_streaming_client.bal
inside the client
module and add client implementation.$ ballerina build client
$ ballerina run target/bin/client.jar